JWT 的介绍
注意: JWT 不局限于在 Django 下使用,也可以在 Flask 等其他框架 或 其他语言下使用
1. JWT 的简介
- jwt(JSON Web Tokens),是一种开发的行业标准 RFC 7519 ,用于安全的表示双方之间的声明
- 目前,jwt广泛应用在系统的用户认证方面,特别是现在前后端分离项目
2. JWT 认证流程

在项目开发中,一般会按照上图所示的过程进行认证,即:用户登录成功之后,服务端给用户浏览器返回一个token,以后用户浏览器要携带token再去向服务端发送请求,服务端校验token的合法性,合法则给用户看数据,否则,返回一些错误信息
3. 传统 token方式 和 JWT 在认证方面有什么差异?
- 传统token方式
- 用户登录成功后,服务端生成一个随机 token 给用户,并且在服务端(数据库或缓存)中保存一份token,以后用户再来访问时需携带 token,服务端接收到 token 之后,去 数据库 或 缓存 中进行校验 token 的是否超时、是否合法
- jwt方式
- 用户登录成功后,服务端通过 jwt 生成一个随机 token 给用户(服务端无需保留token),以后用户再来访问时需携带 token,服务端接收到 token 之后,通过 jwt 对 token 进行校验是否超时、是否合法
JWT 的原理
1. 用户提交用户名和密码给客户端进行验证,如果登陆成功,使用 JWT 创建一个 token,并且返回给用户
- jwt 生成的 token 是由三段字符串组成的,并且用 “.” 连接起来
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- 第一段字符串: HEADER 部分
- 内容包括: 算法类型 和 token 类型
{
"alg": "HS256", # 算法类型
"typ": "JWT" # token 类型
}
- 然后,对 HEADER 部分进行 base64url 加密,然后得到第一部分的字符串(密文)
- base64url 加密是先做 base64加密,然后再将 - 替代 + 及 _ 替代 / 。
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
- 第二段字符串: PAYLOAD 部分
- 一般存储着自定义的内容(即: 想返回给前端的内容)
{
"id": "1",
"name": "Kevin",
"exp": 1516239022 # token 的超时时间,一般都要有该属性
...
}
- 然后,对 PAYLOAD 部分进行 base64url 加密,然后得到第二部分的字符串(密文)
- base64url 加密是先做 base64加密,然后再将 - 替代 + 及 _ 替代 / 。
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ
- 第三段字符串: SIGNATURE 部分
- 第一步: 将 第一段密文 和 第二段密文 用 “.” 拼接起来
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ
- 第二步: 对第一步拼接后得到的字符串进行 HS256 加密 + 加盐
- 第三步: 对 HS256 加密后的密文再做 base64url 加密,然后得到第三部分的字符串(密文)
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- jwt 的源码表现
base64url(
HMACSHA256(
base64UrlEncode(header) + "." + base64UrlEncode(payload),
your-256-bit-secret (秘钥加盐)
)
)
2. 以后用户访问接口的时候都需要携带 token 进行访问,然后后端需要对 token 进行校验
- 校验 token 的第一步: 获取 token
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- 校验 token 的第二步: 以 “.” 作为分隔符对 token 进行分割
第一部分: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
第二部分: eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ
第三部分: SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- 校验 token 的第三步: 对 token 的第二部分进行 base64url 解密,并获取到 PAYLOAD 信息(即: 自定义的内容信息,返回给前端的内容),然后检测 token 是否过期
{
"id": "1",
"name": "Kevin",
"exp": 1516239022 # token 的超时时间,一般都要有该属性
...
}
- 校验 token 的第四步:
- 第一步: 将 token 的第一部分字符串 和 token 的第一部分字符串 用 “.” 拼接起来
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ
- 第二步: 对第一步拼接后得到的字符串进行 HS256 加密 + 加盐,得到 密文一
- 第三步: 对 token 的第三部分进行 base64url 解密,得到 密文二
- 第四步: 判断 密文一 和 密文二 是否相等,如果相等,表示 token 未被修改过(认证通过)
JWT 的使用
1. JWT 的安装
pip3 install pyjwt -i https://pypi.douban.com/simple # 使用豆瓣的镜像
2. JWT 的使用(未对代码进行封装处理)
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from app01 import models
from django.conf import settings
import jwt
from jwt import exceptions
import datetime
class LoginView(APIView):
def post(self, request):
username = request.data.get('username')
password = request.data.get('password')
user_obj = models.UserInfo.objects.filter(username=username, password=password).first()
if not user_obj:
return Response({'code': 1000, 'error': '用户名或密码错误'})
# -------------------- 使用 jwt 构造 token -----------------------
# 盐
SALT = settings.SECRET_KEY
# 构造 header
headers = {
'typ': 'jwt', # token 类型
'alg': 'HS256' # 算法类型
}
# 构造 payload
payload = {
'user_id': user_obj.pk, # 自定义用户id
'username': user_obj.username, # 自定义用户名
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1) # token 超时时间
}
token = jwt.encode(
headers=headers, # 头部信息
payload=payload, # 自定义内容(即: 返回给前端的内容)
key=SALT, # 盐,使用 django 配置文件中的 SECRET_KEY 作为盐
algorithm="HS256", # 加密方式
).decode('utf-8')
return Response({'code': 1001, 'token': token})
class OrderView(APIView):
def get(self, request):
# -------------------- 使用 jwt 验证 token -----------------------
# 从 url 参数中获取 token
token = request.query_params.get('token')
# 从 请求头 中获取 token
# token = request.META.get('HTTP_AUTHORIZATION')
# 盐
SALT = settings.SECRET_KEY
payload = None
msg = ''
try:
# 验证 token 是否正确,如果 token 校验成功会返回 payload 中的内容(即: 自定义的内容)
payload = jwt.decode(token, SALT, True) # True 表示进行 token 的校验
except exceptions.ExpiredSignatureError:
msg = 'token已失效(已过期)'
except jwt.DecodeError:
msg = 'token认证失败'
except jwt.InvalidTokenError:
msg = '非法的token'
if not payload:
return Response({'code': 1002, 'error': msg})
return Response({'code': 1003, 'data': payload})


3. JWT 的使用(对代码进行了封装处理)
- 获取 token
# utils/auth_token.py
import jwt
import datetime
from django.conf import settings
def get_token(payload, timeout):
"""
:param payload: 自定义的内容(即: 返回给前端的内容)
:param timeout: 超时时间
:return: token
"""
# 盐
SALT = settings.SECRET_KEY
# 构造 header
headers = {
'typ': 'jwt', # token 类型
'alg': 'HS256' # 算法类型
}
# 将超时时间添加到 payload 中
payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout) # token 超时时间
token = jwt.encode(
headers=headers, # 头部信息
payload=payload, # 自定义内容(即: 返回给前端的内容)
key=SALT, # 盐,使用 django 配置文件中的 SECRET_KEY 作为盐
algorithm="HS256", # 加密方式
).decode('utf-8')
return token
- token 的校验
# extensions/auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions as rf_exceptions
import jwt
from jwt import exceptions as jwt_exceptions
from django.conf import settings
class TokenAuth(BaseAuthentication):
def authenticate(self, request):
# 从 url 参数中获取 token
token = request.query_params.get('token')
# 从 请求头 中获取 token
# token = request.META.get('HTTP_AUTHORIZATION')
# 盐
SALT = settings.SECRET_KEY
try:
# 验证 token 是否正确,如果 token 校验成功会返回 payload 中的内容(即: 自定义的内容)
payload = jwt.decode(token, SALT, True) # True 表示进行 token 的校验
return payload, token
except jwt_exceptions.ExpiredSignatureError:
raise rf_exceptions.AuthenticationFailed({'code': 1003, 'msg': 'token已失效(已过期)'})
except jwt.DecodeError:
raise rf_exceptions.AuthenticationFailed({'code': 1004, 'msg': 'token认证失败'})
except jwt.InvalidTokenError:
raise rf_exceptions.AuthenticationFailed({'code': 1005, 'msg': '非法的token'})
- 视图
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from app01 import models
from app01.utils.auth_token import get_token
from app01.extensions.auth import TokenAuth
class LoginView(APIView):
def post(self, request):
username = request.data.get('username')
password = request.data.get('password')
user_obj = models.UserInfo.objects.filter(username=username, password=password).first()
if not user_obj:
return Response({'code': 1000, 'error': '用户名或密码错误'})
# 获取 token
token = get_token({
'user_id': user_obj.pk, # 自定义用户id
'username': user_obj.username, # 自定义用户名
}, 1)
return Response({'code': 1001, 'token': token})
class OrderView(APIView):
authentication_classes = [TokenAuth]
def get(self, request):
return Response({
'code': 1006,
'data': {
'user_id': request.user['user_id'],
'username': request.user['username']
}
})


JWT 刷新 Token 的思路与实现
1. 思路
- 准备两个 token
- token(过期时间短) -> 用于平常的验证过期
- refresh_token(过期时间长) -> refresh_token 用于当 token 过期时,换取新的 token 值,以及一个新的 refresh_token 值
- 当客户端在发送请求前,需要检测 token 是否过期
- 情况一: 当 token 没有过期,继续发送请求
- 情况二: 当 token 已过期,发送刷新 token 的请求,根据返回的状态码进行判断是否继续请求或重新登陆
- ① 如果后端检测 refresh_token 正确且没有过期,返回正确的状态码,然后前端更新 Vuex 中的 token 继续发送请求
- ② 如果后端检测 refresh_token 错误或者过期,返回错确的状态码,然后前端需要重新登陆获取 token
- 当服务端接收到 token 时
- 情况一: 当 token 正确且没有过期,返回对应的数据信息
- 情况二: 当 token 错误或者过期,返回错误的数据信息
- 当服务端刷新 token 是
- 情况一: 当 refresh_token 正确且没有过期,返回新的 token、refresh_token、token 过期时间
- 情况二: 当 refresh_token 错误或者过期,返回错误的数据信息
2. 后端实现
- 获取 token 和 token 过期时间
# utils/auth_token.py
import jwt
import datetime
from django.conf import settings
class MyToken(object):
def __init__(self):
self.typ = 'jwt' # token 类型
self.alg = "HS256" # 算法类型
self.SALT = settings.SECRET_KEY # 盐,使用 django 配置文件中的 SECRET_KEY 作为盐
# 获取 token 超时时间
def get_token_time(self, time_type, exp, is_utc=True):
if is_utc:
not_time = datetime.datetime.utcnow()
else:
not_time = datetime.datetime.now()
if time_type == 'days':
return not_time + datetime.timedelta(days=exp)
elif time_type == 'hours':
return not_time + datetime.timedelta(hours=exp)
elif time_type == 'minutes':
return not_time + datetime.timedelta(minutes=exp)
elif time_type == 'seconds':
return not_time + datetime.timedelta(seconds=exp)
# 获取 token 信息(包括: token、token 过期时间)
def get_token_info(self, payload, time_type, exp):
# 构造 header
headers = {
'typ': self.typ, # token 类型
'alg': self.alg # 算法类型
}
# 将超时时间添加到 payload 中
payload['exp'] = self.get_token_time(time_type, exp) # token 超时时间
# 获取不是 utc 的 token 超时时间
token_time = self.get_token_time(time_type, exp, is_utc=False)
token = jwt.encode(
headers=headers, # 头部信息
payload=payload, # 自定义内容(即: 返回给前端的内容)
key=self.SALT, # 盐,使用 django 配置文件中的 SECRET_KEY 作为盐
algorithm=self.alg, # 加密方式
).decode('utf-8')
return {
'token': token, # token
'token_time': token_time # token 过期时间
}
- token 的校验
# extensions/auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions as rf_exceptions
import jwt
from jwt import exceptions as jwt_exceptions
from django.conf import settings
class TokenAuth(BaseAuthentication):
def authenticate(self, request):
# 从 url 参数中获取 token
token = request.query_params.get('token')
# 从 请求头 中获取 token
# token = request.META.get('HTTP_AUTHORIZATION')
# 盐
SALT = settings.SECRET_KEY
try:
# 验证 token 是否正确,如果 token 校验成功会返回 payload 中的内容(即: 自定义的内容)
payload = jwt.decode(token, SALT, True) # True 表示进行 token 的校验
return payload, token
except jwt_exceptions.ExpiredSignatureError:
raise rf_exceptions.AuthenticationFailed({'code': 1003, 'msg': 'token已失效(已过期)'})
except jwt.DecodeError:
raise rf_exceptions.AuthenticationFailed({'code': 1004, 'msg': 'token认证失败'})
except jwt.InvalidTokenError:
raise rf_exceptions.AuthenticationFailed({'code': 1005, 'msg': '非法的token'})
- 视图
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from api import models
from api.utils.auth_token import MyToken
from api.extensions.auth import TokenAuth
import jwt
from jwt import exceptions
from django.conf import settings
# 登陆
class LoginView(APIView):
def post(self, request):
username = request.data.get('username')
password = request.data.get('password')
user_obj = models.UserInfo.objects.filter(username=username, password=password).first()
if not user_obj:
return Response({'code': 1000, 'error': '用户名或密码错误'})
# 获取 token
my_token = MyToken()
# 短时间的 token,用于校验 token
token_info = my_token.get_token_info(
payload={
'user_id': user_obj.pk, # 自定义用户id
'username': user_obj.username, # 自定义用户名
},
time_type='seconds',
exp=3
)
# 长时间的 token,用于更新 token
refresh_token_info = my_token.get_token_info(
payload={
'user_id': user_obj.pk, # 自定义用户id
'username': user_obj.username, # 自定义用户名
},
time_type='seconds',
exp=5
)
return Response({'code': 1001, 'data': {
'token': token_info['token'],
'refresh_token': refresh_token_info['token'],
'token_time': token_info['token_time'],
'+'}'+'}'}})
# 更新 Token
class UpdateTokenView(APIView):
def post(self, request):
# 获取请求头中 token
refresh_token = request.META.get('HTTP_REFRESHAUTHORIZATION')
# 盐
SALT = settings.SECRET_KEY
payload = None
msg = ''
try:
# 验证 token 是否正确,如果 token 校验成功会返回 payload 中的内容(即: 自定义的内容)
payload = jwt.decode(refresh_token, SALT, True) # True 表示进行 token 的校验
# 获取 token
my_token = MyToken()
# 短时间的 token,用于校验 token
token_info = my_token.get_token_info(
payload={
'user_id': payload['user_id'], # 自定义用户id
'username': payload['username'], # 自定义用户名
},
time_type='seconds',
exp=3
)
# 长时间的 token,用于更新 token
refresh_token_info = my_token.get_token_info(
payload={
'user_id': payload['user_id'], # 自定义用户id
'username': payload['username'], # 自定义用户名
},
time_type='seconds',
exp=5
)
return Response({
'code': 1007,
'data': {
'token': token_info['token'],
'refresh_token': refresh_token_info['token'],
'token_time': token_info['token_time'],
}
})
except exceptions.ExpiredSignatureError:
msg = 'token已失效(已过期)'
except jwt.DecodeError:
msg = 'token认证失败'
except jwt.InvalidTokenError:
msg = '非法的token'
if not payload:
return Response({'code': 1008, 'error': msg})
# 订单
class OrderView(APIView):
authentication_classes = [TokenAuth]
def get(self, request):
return Response({
'code': 1006,
'data': '订单数据'
})
2. 前端实现(Vue)
- 通过 axios 拦截器进行实现,在每次请求前验证 token 是否过期,如果已过期那么就刷新 token 后再发送请求
- 注意: 如果出现了跨域问题,一定要用前端方法进行解决,不要使用后端方法进行解决,否则会没有效果
- 使用前需要修改的地方
- 拦截器中的白名单
- 拦截器中的刷新token的请求网址
- 拦截器中刷新token请求成功后返回的状态码
# src/api/ajax.js
import axios from 'axios'
import store from '../store'
import this_vue from '../main'
import {RECEIVE_TOKEN} from "../store/mutations-types";
// axios.defaults.baseURL = 'http://127.0.0.1:8010/api';
axios.defaults.baseURL = `./api`;
// axios.defaults.headers.post['Content-Type'] = 'application/x-www-form-urlencoded';
export default function (url, data = {}, type = "GET") {
return new Promise((resolve, reject) => {
let promise;
// 获取token的相关信息
const token = store.state.token;
const refresh_token = store.state.refresh_token;
const token_time = store.state.token_time;
// axios 取消请求的相关参数
const CancelToken = axios.CancelToken;
const source = CancelToken.source();
// 每次请求都携带token
axios.defaults.headers.common['Authorization'] = token;
axios.defaults.headers.common['RefreshAuthorization'] = refresh_token;
// axios 请求拦截器
let myInterceptor = axios.interceptors.request.use(async config => {
// 白名单
let whitelist = [
'/login/',
'/update_token/',
];
if (token && whitelist.indexOf(config.url) == -1) {
let token_time_obj = new Date(token_time); // token 过期时间的时间戳
let this_time_obj = new Date(); // 当前时间的时间戳
// 如果当前时间大于token过期时间那么就刷新 token
if (token_time_obj.getTime() < this_time_obj.getTime()) {
// 请求刷新 token 的接口
let response = await axios.post('/update_token/');
let res = response.data;
if (res.code == '1007') {
this_vue.$store.commit(RECEIVE_TOKEN, {
token: res.data.token,
refresh_token: res.data.refresh_token,
token_time: res.data.token_time
});
config.headers.Authorization = res.data.token;
config.headers.RefreshAuthorization = res.data.refresh_token;
return config;
} else {
// 如果连 refresh_token 都过期,那么就需要重新登陆
config.cancelToken = source.token;
source.cancel('token已过期,请重新登陆');
this_vue.$router.replace({path: 'login'});
return config
}
} else {
return config;
}
} else {
return config;
}
}, error => {
// 对请求错误做些什么
return Promise.reject(error);
});
// 请求
if (type === 'GET') {
let dataStr = '';
Object.keys(data).forEach(key => {
dataStr += key + '=' + data[key] + '&';
});
if (dataStr !== '') {
dataStr = dataStr.substring(0, dataStr.lastIndexOf('&'));
url += '?' + dataStr;
}
promise = axios.get(url)
} else {
let type_lower = type.toLowerCase();
promise = axios[type_lower](url, data)
}
// 移除 axios 拦截器
axios.interceptors.request.eject(myInterceptor);
promise.then((response) => {
resolve(response.data);
}).catch((error) => {
resolve(error);
})
});
}